/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "Consumer.h"
#include "ConsumerConfig.h"
#include "Message.h"
#include "MessageId.h"
#include "ThreadSafeDeferred.h"
#include "LogUtils.h"
#include <pulsar/c/result.h>
#include <atomic>
#include <thread>
#include <future>
#include <sstream>

Napi::FunctionReference Consumer::constructor;

void Consumer::Init(Napi::Env env, Napi::Object exports) {
  Napi::HandleScope scope(env);

  Napi::Function func =
      DefineClass(env, "Consumer",
                  {
                      InstanceMethod("receive", &Consumer::Receive),
                      InstanceMethod("batchReceive", &Consumer::BatchReceive),
                      InstanceMethod("acknowledge", &Consumer::Acknowledge),
                      InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
                      InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
                      InstanceMethod("negativeAcknowledgeId", &Consumer::NegativeAcknowledgeId),
                      InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
                      InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
                      InstanceMethod("seek", &Consumer::Seek),
                      InstanceMethod("seekTimestamp", &Consumer::SeekTimestamp),
                      InstanceMethod("isConnected", &Consumer::IsConnected),
                      InstanceMethod("close", &Consumer::Close),
                      InstanceMethod("unsubscribe", &Consumer::Unsubscribe),
                  });

  constructor = Napi::Persistent(func);
  constructor.SuppressDestruct();
}

struct MessageListenerProxyData {
  std::shared_ptr<pulsar_message_t> cMessage;
  Consumer *consumer;
  std::function<void(void)> callback;

  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer,
                           std::function<void(void)> callback)
      : cMessage(cMessage), consumer(consumer), callback(callback) {}
};

inline void logMessageListenerError(Consumer *consumer, const char *err) {
  std::ostringstream ss;
  ss << "[" << consumer->GetTopic() << "][" << consumer->GetSubscriptionName()
     << "] Message listener error in processing message: " << err;
  LOG_ERROR(ss.str().c_str());
}

void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
  Napi::Object msg = Message::NewInstance({}, data->cMessage);
  Consumer *consumer = data->consumer;

  Napi::Value ret;
  try {
    ret = jsCallback.Call({msg, consumer->Value()});
  } catch (std::exception &exception) {
    logMessageListenerError(consumer, exception.what());
  }

  if (ret.IsPromise()) {
    Napi::Promise promise = ret.As<Napi::Promise>();
    Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();

    ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) {
                           Napi::Error error = info[0].As<Napi::Error>();
                           logMessageListenerError(consumer, error.what());
                         })});

    promise = ret.As<Napi::Promise>();
    Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();

    finallyFunc.Call(
        promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })});
    return;
  }
  data->callback();
}

void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
  std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
  MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;

  Consumer *consumer = static_cast<Consumer *>(listenerCallback->consumerFuture.get());

  if (listenerCallback->callback.Acquire() != napi_ok) {
    return;
  }

  std::promise<void> promise;
  std::future<void> future = promise.get_future();
  std::unique_ptr<MessageListenerProxyData> dataPtr(
      new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
  listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
  listenerCallback->callback.Release();

  future.wait();
}

void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) { this->cConsumer = cConsumer; }
void Consumer::SetListenerCallback(MessageListenerCallback *listener) {
  if (this->listener != nullptr) {
    // It is only safe to set the listener once for the lifecycle of the Consumer
    return;
  }

  if (listener != nullptr) {
    listener->consumerPromise.set_value(this);
    // If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS
    // code.
    this->Ref();
    this->listener = listener;
  }
}

Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}

struct ConsumerNewInstanceContext {
  ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
                             std::shared_ptr<pulsar_client_t> cClient,
                             std::shared_ptr<ConsumerConfig> consumerConfig)
      : deferred(deferred), cClient(cClient), consumerConfig(consumerConfig){};
  std::shared_ptr<ThreadSafeDeferred> deferred;
  std::shared_ptr<pulsar_client_t> cClient;
  std::shared_ptr<ConsumerConfig> consumerConfig;

  static void subscribeCallback(pulsar_result result, pulsar_consumer_t *rawConsumer, void *ctx) {
    auto instanceContext = static_cast<ConsumerNewInstanceContext *>(ctx);
    auto deferred = instanceContext->deferred;
    auto cClient = instanceContext->cClient;
    auto consumerConfig = instanceContext->consumerConfig;
    delete instanceContext;

    if (result != pulsar_result_Ok) {
      return deferred->Reject(std::string("Failed to create consumer: ") + pulsar_result_str(result));
    }

    auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, pulsar_consumer_free);
    auto listener = consumerConfig->GetListenerCallback();

    deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
      Napi::Object obj = Consumer::constructor.New({});
      Consumer *consumer = Consumer::Unwrap(obj);

      consumer->SetCConsumer(cConsumer);
      consumer->SetListenerCallback(listener);

      return obj;
    });
  }
};

Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
  auto deferred = ThreadSafeDeferred::New(info.Env());
  auto config = info[0].As<Napi::Object>();
  std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();

  consumerConfig->InitConfig(deferred, config, &MessageListener);
  if (deferred->IsDone()) {
    return deferred->Promise();
  }

  const std::string &topic = consumerConfig->GetTopic();
  const std::vector<std::string> &topics = consumerConfig->GetTopics();
  const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
  const std::string &subscription = consumerConfig->GetSubscription();

  auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

  if (!topicsPattern.empty()) {
    pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
                                          consumerConfig->GetCConsumerConfig().get(),
                                          &ConsumerNewInstanceContext::subscribeCallback, ctx);
  } else if (topics.size() > 0) {
    const char **cTopics = new const char *[topics.size()];
    for (size_t i = 0; i < topics.size(); i++) {
      cTopics[i] = topics[i].c_str();
    }
    pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
                                               consumerConfig->GetCConsumerConfig().get(),
                                               &ConsumerNewInstanceContext::subscribeCallback, ctx);
    delete[] cTopics;
  } else {
    pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
                                  consumerConfig->GetCConsumerConfig().get(),
                                  &ConsumerNewInstanceContext::subscribeCallback, ctx);
  }

  return deferred->Promise();
}

std::string Consumer::GetTopic() { return {pulsar_consumer_get_topic(this->cConsumer.get())}; }

std::string Consumer::GetSubscriptionName() {
  return {pulsar_consumer_get_subscription_name(this->cConsumer.get())};
}

// We still need a receive worker because the c api is missing the equivalent async definition
class ConsumerReceiveWorker : public Napi::AsyncWorker {
 public:
  ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, std::shared_ptr<pulsar_consumer_t> cConsumer,
                        int64_t timeout = -1)
      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
        deferred(deferred),
        cConsumer(cConsumer),
        timeout(timeout) {}
  ~ConsumerReceiveWorker() {}
  void Execute() {
    pulsar_result result;
    pulsar_message_t *rawMessage;
    if (timeout > 0) {
      result = pulsar_consumer_receive_with_timeout(this->cConsumer.get(), &rawMessage, timeout);
    } else {
      result = pulsar_consumer_receive(this->cConsumer.get(), &rawMessage);
    }

    if (result != pulsar_result_Ok) {
      SetError(std::string("Failed to receive message: ") + pulsar_result_str(result));
    } else {
      this->cMessage = std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free);
    }
  }
  void OnOK() {
    Napi::Object obj = Message::NewInstance({}, this->cMessage);
    this->deferred.Resolve(obj);
  }
  void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }

 private:
  Napi::Promise::Deferred deferred;
  std::shared_ptr<pulsar_consumer_t> cConsumer;
  std::shared_ptr<pulsar_message_t> cMessage;
  int64_t timeout;
};

Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);
  pulsar_consumer_batch_receive_async(
      this->cConsumer.get(),
      [](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve([rawMessages](const Napi::Env env) {
            int listSize = pulsar_messages_size(rawMessages);
            Napi::Array jsArray = Napi::Array::New(env, listSize);
            for (int i = 0; i < listSize; i++) {
              pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
              pulsar_message_t *message = pulsar_message_create();
              pulsar_message_copy(rawMessage, message);
              Napi::Object obj =
                  Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
              jsArray.Set(i, obj);
            }
            pulsar_messages_free(rawMessages);
            return jsArray;
          });
        }
      },
      ctx);
  return deferred->Promise();
}

Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
  if (info[0].IsUndefined()) {
    auto deferred = ThreadSafeDeferred::New(Env());
    auto ctx = new ExtDeferredContext(deferred);
    pulsar_consumer_receive_async(
        this->cConsumer.get(),
        [](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
          auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
          auto deferred = deferredContext->deferred;
          delete deferredContext;

          if (result != pulsar_result_Ok) {
            deferred->Reject(std::string("Failed to receive message: ") + pulsar_result_str(result));
          } else {
            deferred->Resolve([rawMessage](const Napi::Env env) {
              Napi::Object obj = Message::NewInstance(
                  {}, std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free));
              return obj;
            });
          }
        },
        ctx);
    return deferred->Promise();
  } else {
    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
    Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
    wk->Queue();
    return deferred.Promise();
  }
}

Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
  auto obj = info[0].As<Napi::Object>();
  auto msg = Message::Unwrap(obj);
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_acknowledge_async(
      this->cConsumer.get(), msg->GetCMessage().get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to acknowledge: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
  auto obj = info[0].As<Napi::Object>();
  auto *msgId = MessageId::Unwrap(obj);
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_acknowledge_async_id(
      this->cConsumer.get(), msgId->GetCMessageId().get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to acknowledge id: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
  Napi::Object obj = info[0].As<Napi::Object>();
  Message *msg = Message::Unwrap(obj);
  std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage();
  pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get());
}

void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
  Napi::Object obj = info[0].As<Napi::Object>();
  MessageId *msgId = MessageId::Unwrap(obj);
  std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId();
  pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), cMessageId.get());
}

Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
  auto obj = info[0].As<Napi::Object>();
  auto *msg = Message::Unwrap(obj);
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_acknowledge_cumulative_async(
      this->cConsumer.get(), msg->GetCMessage().get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to acknowledge cumulatively: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
  auto obj = info[0].As<Napi::Object>();
  auto *msgId = MessageId::Unwrap(obj);
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_acknowledge_cumulative_async_id(
      this->cConsumer.get(), msgId->GetCMessageId().get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to acknowledge cumulatively by id: ") +
                           pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::Seek(const Napi::CallbackInfo &info) {
  auto obj = info[0].As<Napi::Object>();
  auto *msgId = MessageId::Unwrap(obj);
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_seek_async(
      this->cConsumer.get(), msgId->GetCMessageId().get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to seek message by id: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::SeekTimestamp(const Napi::CallbackInfo &info) {
  Napi::Number timestamp = info[0].As<Napi::Object>().ToNumber();
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_seek_by_timestamp_async(
      this->cConsumer.get(), timestamp.Int64Value(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to seek message by timestamp: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
  Napi::Env env = info.Env();
  return Napi::Boolean::New(env, pulsar_consumer_is_connected(this->cConsumer.get()));
}

void Consumer::Cleanup() {
  if (this->listener != nullptr) {
    pulsar_consumer_pause_message_listener(this->cConsumer.get());
    this->listener->callback.Release();
    this->listener = nullptr;
    this->Unref();
  }
}

Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);
  this->Cleanup();

  pulsar_consumer_close_async(
      this->cConsumer.get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to close consumer: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) {
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);

  pulsar_consumer_pause_message_listener(this->cConsumer.get());
  pulsar_consumer_unsubscribe_async(
      this->cConsumer.get(),
      [](pulsar_result result, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to unsubscribe consumer: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
        }
      },
      ctx);

  return deferred->Promise();
}

Consumer::~Consumer() { this->Cleanup(); }
